package com.hanxiaozhang.redisson.source;

import org.redisson.RedissonLockEntry;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

/**
 * 〈一句话功能简述〉<br>
 * 〈公平锁〉
 *
 * @author hanxinghua
 * @create 2022/9/12
 * @since 1.0.0
 */
public class MyRedissonFairLock extends MyRedissonLock implements RLock {

    /**
     * 线程等待时间
     */
    private final long threadWaitTime = 5000;

    /**
     * 命令执行器
     */
    private final CommandAsyncExecutor commandExecutor;

    /**
     * 锁等待队列名称：redisson_lock_queue:{lockName}
     */
    private final String threadsQueueName;

    /**
     * 等待队列中线程预计获取锁时间的Zset集合名称：redisson_lock_timeout:{lockName}
     * 它是的分值按照时间戳存放
     */
    private final String timeoutSetName;


    /**
     * 构造函数
     *
     * @param commandExecutor
     * @param name
     */
    protected MyRedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        threadsQueueName = prefixName("redisson_lock_queue", name);
        timeoutSetName = prefixName("redisson_lock_timeout", name);
    }

    /**
     * 从发布订阅中获取实体
     * <p>
     * 这里颗粒度更细
     * entryName:threadId
     *
     * @param threadId
     * @return
     */
    @Override
    protected RedissonLockEntry getEntry(long threadId) {
        return PUBSUB.getEntry(getEntryName() + ":" + threadId);
    }

    /**
     * 订阅
     * <p>
     * 这里颗粒度更细
     * entryName -> entryName:threadId
     * channelName -> channelName:threadId
     *
     * @param threadId
     * @return
     */
    @Override
    protected RFuture<RedissonLockEntry> subscribe(long threadId) {
        return PUBSUB.subscribe(getEntryName() + ":" + threadId,
                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService());
    }

    /**
     * 退订
     * <p>
     * 这里颗粒度更细
     * entryName -> entryName:threadId
     * channelName -> channelName:threadId
     *
     * @param future
     * @param threadId
     */
    @Override
    protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
        PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService());
    }


    /**
     * 获取锁失败异步
     *
     * @param threadId
     * @return
     */
    @Override
    protected RFuture<Void> acquireFailedAsync(long threadId) {

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE,
                RedisCommands.EVAL_VOID,
                //  lindex  redisson_lock_queue:{lockName} 0
                "local firstThreadId = redis.call('lindex', KEYS[1], 0); " +
                        // 如果 firstThreadId 与 lockName 相等
                        "if firstThreadId == ARGV[1] then " +
                        // zrange redisson_lock_timeout:{lockName}  0 -1 ,即获取Zset中所有元素
                        "local keys = redis.call('zrange', KEYS[2], 0, -1); " +
                        // 循环
                        "for i = 1, #keys, 1 do " +
                        // zincrby  redisson_lock_timeout:{lockName}  -threadWaitTime  keys[i]
                        "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), keys[i]);" +
                        "end;" +
                        "end;" +
                        // 从Zset中删除
                        "redis.call('zrem', KEYS[2], ARGV[1]); " +
                        // 从List中删除
                        "redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
                // keys
                Arrays.<Object>asList(threadsQueueName, timeoutSetName),
                // params
                getLockName(threadId), threadWaitTime);
    }


    /**
     * 加锁异步
     *
     * @param leaseTime
     * @param unit
     * @param threadId
     * @param command
     * @param <T>
     * @return
     */
    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        long currentTime = System.currentTimeMillis();
        // 命令类型是EVAL_NULL_BOOLEAN
        if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // 删除列表中第一个位置是过期的线程
                    "while true do "
                            //  lindex  redisson_lock_queue:{lockName} 0
                            + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                            // 如果队列是空，结束循环
                            + "if firstThreadId2 == false then "
                            + "break;"
                            + "end; "
                            // zscore redisson_lock_timeout:{lockName} firstThreadId2 ,返回元素的分值
                            + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                            //  如果 timeout <= 当前时间戳
                            + "if timeout <= tonumber(ARGV[3]) then "
                            // Zset中删除该元素
                            + "redis.call('zrem', KEYS[3], firstThreadId2); "
                            // List中弹出该元素
                            + "redis.call('lpop', KEYS[2]); "
                            // 如果 timeout > 当前时间戳，退出当前循环
                            + "else "
                            + "break;"
                            + "end; "
                            + "end;"
                            +
                            // 如果不存在name的key，并且不存队列或者队列第一个元素是lockName时，当前线程可以获得锁
                            "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
                            + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
                            // lpop  redisson_lock_timeout:{lockName}
                            "redis.call('lpop', KEYS[2]); " +
                            // zrem redisson_lock_timeout:{lockName}  ARGV[2]
                            "redis.call('zrem', KEYS[3], ARGV[2]); " +
                            // hset getName()  ARGV[2]  1
                            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                            // pexpire 设置有效期
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            // 返回 null
                            "return nil; " +
                            "end; " +
                            // 如果存在name的key，并且field也是lock，表示当前线程已经获得锁，此时处理锁重入
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            //  hincrby 1
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            // pexpire 设置有效期
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return 1;",
                    Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
                    internalLockLeaseTime, getLockName(threadId), currentTime);
        }
        // 命令类型是EVAL_LONG
        if (command == RedisCommands.EVAL_LONG) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // 删除过期的线程
                    "while true do "
                            + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                            + "if firstThreadId2 == false then "
                            + "break;"
                            + "end; "
                            + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                            + "if timeout <= tonumber(ARGV[4]) then "
                            + "redis.call('zrem', KEYS[3], firstThreadId2); "
                            + "redis.call('lpop', KEYS[2]); "
                            + "else "
                            + "break;"
                            + "end; "
                            + "end;"
                            // 以上部分大部分同上

                            + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
                            + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
                            "redis.call('lpop', KEYS[2]); " +
                            "redis.call('zrem', KEYS[3], ARGV[2]); " +
                            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            // 以上部分大部分同上

                            // 获取列表第一个元素，计算当前的ttl，然后再重新计算timeout赋值给Zset中，Zset添加成功后，再rpush到队列，返回ttl
                            "local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
                            "local ttl; " +
                            // 如果列表第一元素不为空，并且不是lockName时，从Zset中获取lockName的timeout，计算ttl
                            "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
                            // 计算 ttl
                            "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
                            //  列表第一元素是lockName时，获取ttl
                            "else "
                            // 获取 ttl
                            + "ttl = redis.call('pttl', KEYS[1]);" +
                            "end; " +

                            // 计算 timeout
                            "local timeout = ttl + tonumber(ARGV[3]);" +
                            // 如果 zadd 添加成功，则 rpush
                            "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                            "redis.call('rpush', KEYS[2], ARGV[2]);" +
                            "end; " +
                            // 返回 ttl
                            "return ttl;",
                    Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
                    // 锁可以持续使用的时间，超过该时间自动释放  锁名称  currentTime + threadWaitTime  currentTime
                    internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
        }

        throw new IllegalArgumentException();
    }

    /**
     * 解锁异步
     *
     * @param threadId
     * @return
     */
    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 删除过期的线程
                "while true do "
                        + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                        + "if firstThreadId2 == false then "
                        + "break;"
                        + "end; "
                        + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                        + "if timeout <= tonumber(ARGV[4]) then "
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                        + "else "
                        + "break;"
                        + "end; "
                        + "end;"

                        // 1. 如果不存name时，获取列表第一个元素，并且第一元素不为空，向解锁频道发送通知
                        + "if (redis.call('exists', KEYS[1]) == 0) then " +
                        // 获取list的一个元素，赋值为nextThreadId
                        "local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
                        // nextThreadId 不为空
                        "if nextThreadId ~= false then " +
                        // 发布通知  publish channel_name .. ':' .. nextThreadId 0
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                        "end; " +
                        // 返回 1
                        "return 1; " +
                        "end;" +

                        // 2. 如果 hexists name lockName 不存在时，直接返回nil
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +

                        // 3. 如果 hexists name lockName 存在时，处理锁可重入，或删除key，通知下一个元素
                        //  hincrby  getName()  getLockName(threadId)  -1
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        // 如果counter大于0，设置过期时间
                        "if (counter > 0) then " +
                        //  设置过期时间
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        // 返回 0
                        "return 0; " +
                        "end; " +
                        // 如果counter小于等于0，删除key
                        "redis.call('del', KEYS[1]); " +
                        // 获取列表第一个元素，如果不为空，发送解锁通知
                        "local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
                        // 如果第一个元素不为空
                        "if nextThreadId ~= false then " +
                        // 发布通知
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                        "end; " +
                        // 返回 1
                        "return 1; ",
                Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
                // 解锁消息（命令：publish channel_name message ）  锁租期   锁名称  当前时间
                LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    @Override
    public RFuture<Boolean> deleteAsync() {
        return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), threadsQueueName, timeoutSetName);
    }

    /**
     * 设置有效期
     *
     * @param timeToLive
     * @param timeUnit
     * @return
     */
    @Override
    public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "redis.call('pexpire', KEYS[2], ARGV[1]); " +
                        "return redis.call('pexpire', KEYS[3], ARGV[1]); ",
                Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
                timeUnit.toMillis(timeToLive));
    }

    /**
     * 设置有效期
     *
     * @param timestamp
     * @return
     */
    @Override
    public RFuture<Boolean> expireAtAsync(long timestamp) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "redis.call('pexpireat', KEYS[1], ARGV[1]); " +
                        "redis.call('pexpireat', KEYS[2], ARGV[1]); " +
                        "return redis.call('pexpireat', KEYS[3], ARGV[1]); ",
                Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
                timestamp);
    }


    /**
     * 清除有效期
     *
     * @return
     */
    @Override
    public RFuture<Boolean> clearExpireAsync() {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "redis.call('persist', KEYS[1]); " +
                        "redis.call('persist', KEYS[2]); " +
                        "return redis.call('persist', KEYS[3]); ",
                Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName));
    }


    /**
     * 强制解锁异步
     *
     * @return
     */
    @Override
    public RFuture<Boolean> forceUnlockAsync() {
        cancelExpirationRenewal(null);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 删除过期的线程
                "while true do "
                        + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                        + "if firstThreadId2 == false then "
                        + "break;"
                        + "end; "
                        + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                        + "if timeout <= tonumber(ARGV[2]) then "
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                        + "else "
                        + "break;"
                        + "end; "
                        + "end;"
                        +

                        // 删除 key
                        "if (redis.call('del', KEYS[1]) == 1) then " +
                        // 获取列表下一个元素
                        "local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
                        // 如果下一个元素不为空
                        "if nextThreadId ~= false then " +
                        // 发布通知
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                        "end; " +
                        // 返回 1
                        "return 1; " +
                        "end; " +
                        // 否则，返回 1
                        "return 0;",
                Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
                LockPubSub.unlockMessage, System.currentTimeMillis());
    }

}
